/**
 * 
 */
package com.opensource.netty.redis.proxy.zk.registry.listen;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.alibaba.fastjson.JSONObject;
import com.opensource.netty.redis.proxy.commons.constants.RedisConstants;
import com.opensource.netty.redis.proxy.commons.utils.StringUtils;
import com.opensource.netty.redis.proxy.core.config.LBRedisServerMasterCluster;
import com.opensource.netty.redis.proxy.core.config.RedisPoolConfig;
import com.opensource.netty.redis.proxy.core.config.support.LBRedisServerBean;
import com.opensource.netty.redis.proxy.core.config.support.LBRedisServerClusterBean;
import com.opensource.netty.redis.proxy.core.listen.impl.AbstractRegistryListenImpl;
import com.opensource.netty.redis.proxy.core.log.impl.LoggerUtils;
import com.opensource.netty.redis.proxy.net.client.LBRedisClient;
import com.opensource.netty.redis.proxy.pool.commons.LBRedisProxyPoolConfig;

/**
 * @author liubing
 *
 */
public class ZookeeperRegistryListen extends AbstractRegistryListenImpl {
	
	
	public ZookeeperRegistryListen(LBRedisServerMasterCluster ffanRedisServerMasterCluster) {
		super(ffanRedisServerMasterCluster);
	}

	/* (non-Javadoc)
	 * 监控主数据的变化
	 * @see com.wanda.ffan.redis.proxy.core.listen.IRegistryListen#handleDataChange(java.lang.String, java.lang.Object)
	 */
	@Override
	public void handleDataChange(String dataPath, Object data) {
		int index=dataPath.lastIndexOf(RedisConstants.PATH_SEPARATOR);
		dataPath=dataPath.substring(index+1, dataPath.length());
		dataPath=dataPath.replace(RedisConstants.PROTOCOL_SEPARATOR, RedisConstants.SEPERATOR_ACCESS_LOG);
		StringBuffer sbBuffer=new StringBuffer();
		sbBuffer.append(RedisConstants.REDIS_PROXY).append(dataPath);
		JSONObject jsonObject=JSONObject.parseObject(String.valueOf(data));
		updateRedisMasterBean(sbBuffer.toString(), jsonObject);		
	}
	
	
	/**
	 * 更改指定的主
	 * @param key
	 * @param ffanRedisMasterBean
	 */
	private void updateRedisMasterBean(String key,JSONObject jsonObject){
		LBRedisServerBean newRedisMasterBean  =new LBRedisServerBean();
		List<LBRedisServerBean> redisServerMasterBeans=new ArrayList<LBRedisServerBean>();
		for(LBRedisServerBean redisServerBean :super.getFfanRedisServerMasterCluster().getMasters()){
			if(redisServerBean.getKey().equals(key)){
				RedisPoolConfig redisPoolConfig=redisServerBean.getRedisPoolConfig();
				newRedisMasterBean.setHost(jsonObject.getString("host"));
				newRedisMasterBean.setPort(jsonObject.getIntValue("port"));
				newRedisMasterBean.setRedisPoolConfig(redisPoolConfig);
				redisServerMasterBeans.add(newRedisMasterBean);
				//break;
			}else{
				redisServerMasterBeans.add(redisServerBean);
			}
		}
		super.getFfanRedisServerMasterCluster().setMasters(redisServerMasterBeans);
		//--------------------------------------------------------------------------
		for(LBRedisServerClusterBean ffanRedisServerClusterBean:super.getFfanRedisServerMasterCluster().getRedisServerClusterBeans()){
			if(ffanRedisServerClusterBean.getRedisServerMasterBean().getKey().equals(key)){
				ffanRedisServerClusterBean.setRedisServerMasterBean(newRedisMasterBean);
				break;
			}
		}
		//--------------------------------------------------------------------------
		if(super.getFfanRedisServerMasterCluster().getRedisServerClusterBeanMap().containsKey(key)){
			LBRedisServerClusterBean ffanRedisServerClusterBean=super.getFfanRedisServerMasterCluster().getRedisServerClusterBeanMap().get(key);
			if(ffanRedisServerClusterBean.getRedisServerMasterBean().getKey().equals(key)){
				ffanRedisServerClusterBean.setRedisServerMasterBean(newRedisMasterBean);
			}
		}
		//--------------------------------------------------------------------------
		if(!super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().containsKey(newRedisMasterBean.getKey())){
			//初始化新的客户端
			  LBRedisProxyPoolConfig lbRedisProxyPoolConfig=convertLBRedisProxyPoolConfig(newRedisMasterBean);
			  LBRedisClient ffanRedisClient=new LBRedisClient(lbRedisProxyPoolConfig);
			  super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().put(newRedisMasterBean.getKey(), ffanRedisClient);
		}
		if(super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().containsKey(key)){
			super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().get(key).close();
			super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().remove(key);
		}
	}
	
	/**
	 * 监控主对应从的变化
	 */
	@Override
	public void handleChildChange(String parentPath,String data,List<String> childPaths, Map<String, String> slaveMap) {
		if(StringUtils.isBlank(data)){
			LoggerUtils.warn("this master had been deleted,path:"+parentPath);
		}else{
			if(childPaths==null||childPaths.size()==0){//有主节点，无从节点
				int index=parentPath.lastIndexOf(RedisConstants.PATH_SEPARATOR);
				parentPath=parentPath.substring(index+1, parentPath.length());
				parentPath=parentPath.replace(RedisConstants.PROTOCOL_SEPARATOR, RedisConstants.SEPERATOR_ACCESS_LOG);
				StringBuffer sbBuffer=new StringBuffer();
				sbBuffer.append(RedisConstants.REDIS_PROXY).append(parentPath);
				super.getFfanRedisServerMasterCluster().removeSlavesByMaster(sbBuffer.toString());
			}else{//有主有从节点
				Map<String, JSONObject> jsonSlaveDatas=new HashMap<String, JSONObject>();
				for(String childData:childPaths){//childData 格式 IP:port
					childData=childData.replace(RedisConstants.PROTOCOL_SEPARATOR, RedisConstants.SEPERATOR_ACCESS_LOG);
					StringBuffer sbBuffer=new StringBuffer();
					sbBuffer.append(RedisConstants.REDIS_PROXY).append(parentPath);					
					JSONObject jsonObject=JSONObject.parseObject(slaveMap.get(childData));
					jsonSlaveDatas.put(sbBuffer.toString(), jsonObject);
				}
				updateFfanRedisClusterBeans(parentPath, jsonSlaveDatas);
			}
		}
	}
	
	
	/**
	 * 更新指定主的从
	 * @param parentKey 父节点路径
	 * @param jsonSlaveDatas 子节点路径对应值
	 */
	private void updateFfanRedisClusterBeans(String parentKey,Map<String, JSONObject> jsonSlaveDatas){
		
		operateSlaveMasterClusters(parentKey, jsonSlaveDatas);
			//---------------------------------------------------------------------------------------------
		operateRedisServerClusterBeanMap(parentKey, jsonSlaveDatas);
			//------------------------------------------------------------------------------------------------
		operateRedisServerClusterBeans(parentKey, jsonSlaveDatas);
			//------------------------------------------------------------------------------------------------
		operateRedisClientBeanMap(parentKey);
	}	
	
	private void operateRedisClientBeanMap(String parentKey){
		List<LBRedisServerBean> redisServerSlaveBeans= super.getFfanRedisServerMasterCluster().getMasterClusters().get(parentKey);
		
		if(redisServerSlaveBeans!=null&&redisServerSlaveBeans.size()>0){
			for(LBRedisServerBean lbRedisServerBean:redisServerSlaveBeans){
				if(!super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().containsKey(lbRedisServerBean.getKey())){
					LBRedisProxyPoolConfig lbRedisProxyPoolConfig=convertLBRedisProxyPoolConfig(lbRedisServerBean);
					LBRedisClient ffanRedisClient=new LBRedisClient(lbRedisProxyPoolConfig);
					super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().put(lbRedisServerBean.getKey(), ffanRedisClient);
				}
			}
		}
		
	}
	
	private void operateSlaveMasterClusters(String parentKey,Map<String, JSONObject> jsonSlaveDatas){
		List<LBRedisServerBean> redisServerSlaveBeans=super.getFfanRedisServerMasterCluster().getMasterClusters().get(parentKey);
		if(redisServerSlaveBeans!=null){
			List<LBRedisServerBean> newRedisServerSlaveBeans=new ArrayList<LBRedisServerBean>();
			for(LBRedisServerBean ffanRedisServerBean:redisServerSlaveBeans){			
				if(jsonSlaveDatas.containsKey(ffanRedisServerBean.getKey())){//不包含
					JSONObject jsonObject=jsonSlaveDatas.get(ffanRedisServerBean.getKey());
					LBRedisServerBean newRedisMasterBean  =new LBRedisServerBean();
					RedisPoolConfig redisPoolConfig=ffanRedisServerBean.getRedisPoolConfig();
					newRedisMasterBean.setHost(jsonObject.getString("host"));
					newRedisMasterBean.setPort(jsonObject.getIntValue("port"));
					newRedisMasterBean.setWeight(jsonObject.getIntValue("weight"));
					newRedisMasterBean.setRedisPoolConfig(redisPoolConfig);
					newRedisServerSlaveBeans.add(newRedisMasterBean);
				}
			}
			if(newRedisServerSlaveBeans.size()>0){//具有值
				super.getFfanRedisServerMasterCluster().getMasterClusters().put(parentKey, newRedisServerSlaveBeans);
			}else{//无任何值
				super.getFfanRedisServerMasterCluster().getMasterClusters().remove(parentKey);
			}
		}
	}
	
	private void operateRedisServerClusterBeanMap(String parentKey,Map<String, JSONObject> jsonSlaveDatas){
		if(super.getFfanRedisServerMasterCluster().getRedisServerClusterBeanMap().containsKey(parentKey)){
			LBRedisServerClusterBean ffanRedisServerClusterBean=super.getFfanRedisServerMasterCluster().getRedisServerClusterBeanMap().get(parentKey);				
			if(ffanRedisServerClusterBean.getRedisServerMasterBean().getKey().equals(parentKey)&&ffanRedisServerClusterBean.getRedisServerSlaveBeans()!=null&&ffanRedisServerClusterBean.getRedisServerSlaveBeans().size()>0){//父节点相同
				 List<LBRedisServerBean> redisServerSlaveBeans1=ffanRedisServerClusterBean.getRedisServerSlaveBeans();
				 List<LBRedisServerBean> newRedisServerSlaveBeans1=new ArrayList<LBRedisServerBean>();
					for(LBRedisServerBean ffanRedisServerBean:redisServerSlaveBeans1){				
						if(jsonSlaveDatas.containsKey(ffanRedisServerBean.getKey())){//包含
							JSONObject jsonObject=jsonSlaveDatas.get(ffanRedisServerBean.getKey());
							LBRedisServerBean newRedisMasterBean  =new LBRedisServerBean();
							RedisPoolConfig redisPoolConfig=ffanRedisServerBean.getRedisPoolConfig();
							newRedisMasterBean.setHost(jsonObject.getString("host"));
							newRedisMasterBean.setPort(jsonObject.getIntValue("port"));
							newRedisMasterBean.setWeight(jsonObject.getIntValue("weight"));
							newRedisMasterBean.setRedisPoolConfig(redisPoolConfig);
							newRedisServerSlaveBeans1.add(newRedisMasterBean);
						}
					}
				if(newRedisServerSlaveBeans1.size()>0){//具有值
						super.getFfanRedisServerMasterCluster().getRedisServerClusterBeanMap().get(parentKey).setRedisServerSlaveBeans(newRedisServerSlaveBeans1);
				}else{//无任何值
						super.getFfanRedisServerMasterCluster().getRedisServerClusterBeanMap().get(parentKey).getRedisServerSlaveBeans().clear();
				}	
			}
	   }
	}
	
	private void operateRedisServerClusterBeans(String parentKey,Map<String, JSONObject> jsonSlaveDatas){
		if(super.getFfanRedisServerMasterCluster().getRedisServerClusterBeans()!=null&&super.getFfanRedisServerMasterCluster().getRedisServerClusterBeans().size()>0){
			for(LBRedisServerClusterBean ffanRedisServerClusterBean:super.getFfanRedisServerMasterCluster().getRedisServerClusterBeans()){
				if(ffanRedisServerClusterBean.getRedisServerMasterBean().getKey().equals(parentKey)&&ffanRedisServerClusterBean.getRedisServerSlaveBeans()!=null&&ffanRedisServerClusterBean.getRedisServerSlaveBeans().size()>0){//父节点相同
					 List<LBRedisServerBean> redisServerSlaveBeans2=ffanRedisServerClusterBean.getRedisServerSlaveBeans();
					 List<LBRedisServerBean> newRedisServerSlaveBeans2=new ArrayList<LBRedisServerBean>();
						for(LBRedisServerBean ffanRedisServerBean:redisServerSlaveBeans2){//删除挂掉的从				
							if(jsonSlaveDatas.containsKey(ffanRedisServerBean.getKey())){//不包含
								JSONObject jsonObject=jsonSlaveDatas.get(ffanRedisServerBean.getKey());
								LBRedisServerBean newRedisMasterBean  =new LBRedisServerBean();
								RedisPoolConfig redisPoolConfig=ffanRedisServerBean.getRedisPoolConfig();
								newRedisMasterBean.setHost(jsonObject.getString("host"));
								newRedisMasterBean.setPort(jsonObject.getIntValue("port"));
								newRedisMasterBean.setWeight(jsonObject.getIntValue("weight"));
								newRedisMasterBean.setRedisPoolConfig(redisPoolConfig);
								newRedisServerSlaveBeans2.add(newRedisMasterBean);
							}
						}
					if(newRedisServerSlaveBeans2.size()>0){//具有值
						ffanRedisServerClusterBean.setRedisServerSlaveBeans(newRedisServerSlaveBeans2);
					}else{//无任何值
						ffanRedisServerClusterBean.getRedisServerSlaveBeans().clear();
					}	
				}
			}
		}
	}

	 /**
	   * 转换
	   * @param lbRedisServerBean
	   * @return
	   */
	  private LBRedisProxyPoolConfig convertLBRedisProxyPoolConfig(LBRedisServerBean lbRedisServerBean){
		  LBRedisProxyPoolConfig lbRedisProxyPoolConfig=new LBRedisProxyPoolConfig();
		  RedisPoolConfig redisPoolConfig= lbRedisServerBean.getRedisPoolConfig();
		  lbRedisProxyPoolConfig.setConnectionTimeout(redisPoolConfig.getConnectionTimeout());
		  lbRedisProxyPoolConfig.setHost(lbRedisServerBean.getHost());
		  lbRedisProxyPoolConfig.setInitialEntries(redisPoolConfig.getInitialConnection());
		  lbRedisProxyPoolConfig.setMaxActiveEntries(redisPoolConfig.getMaxActiveConnection());
		  lbRedisProxyPoolConfig.setMaxWaitMillisOnBorrow(redisPoolConfig.getMaxWaitMillisOnBorrow());
		  lbRedisProxyPoolConfig.setMinActiveEntries(redisPoolConfig.getMinConnection());
		  lbRedisProxyPoolConfig.setMinEvictableIdleTimeMillis(redisPoolConfig.getMinEvictableIdleTimeMillis());
		  lbRedisProxyPoolConfig.setMinIdleEntries(redisPoolConfig.getMinIdleEntries());
		  lbRedisProxyPoolConfig.setPort(lbRedisServerBean.getPort());
		  lbRedisProxyPoolConfig.setTestOnBorrow(redisPoolConfig.isTestOnBorrow());
		  lbRedisProxyPoolConfig.setTestOnReturn(redisPoolConfig.isTestOnReturn());
		  lbRedisProxyPoolConfig.setTestWhileIdle(redisPoolConfig.isTestWhileIdle());
		  lbRedisProxyPoolConfig.setTimeBetweenEvictionRunsMillis(redisPoolConfig.getTimeBetweenEvictionRunsMillis());
		  return lbRedisProxyPoolConfig;
	  }

	@Override
	public void handleSlaveDataChange(String dataPath, Object data) {
		int index=dataPath.lastIndexOf(RedisConstants.PATH_SEPARATOR);
		dataPath=dataPath.substring(index+1, dataPath.length());
		dataPath=dataPath.replace(RedisConstants.PROTOCOL_SEPARATOR, RedisConstants.SEPERATOR_ACCESS_LOG);
		StringBuffer sbBuffer=new StringBuffer();
		sbBuffer.append(RedisConstants.REDIS_PROXY).append(dataPath);
		JSONObject jsonObject=JSONObject.parseObject(String.valueOf(data));
		updateRedisSlaveBean(sbBuffer.toString(), jsonObject);		
	}
	
	
	/**
	 * 更改指定的从
	 * @param key
	 * @param ffanRedisMasterBean
	 */
	private void updateRedisSlaveBean(String key,JSONObject jsonObject){
		LBRedisServerBean newRedisSlaveBean  =new LBRedisServerBean();

		for(LBRedisServerClusterBean ffanRedisServerClusterBean:super.getFfanRedisServerMasterCluster().getRedisServerClusterBeans()){
			List<LBRedisServerBean> redisServerSlaveBeans=new ArrayList<LBRedisServerBean>();
			for(LBRedisServerBean redisServerBean:ffanRedisServerClusterBean.getRedisServerSlaveBeans()){
				if(redisServerBean.getKey().equals(key)){
					RedisPoolConfig redisPoolConfig=redisServerBean.getRedisPoolConfig();
					newRedisSlaveBean.setHost(jsonObject.getString("host"));
					newRedisSlaveBean.setPort(jsonObject.getIntValue("port"));
					newRedisSlaveBean.setWeight(jsonObject.getIntValue("weight"));
					newRedisSlaveBean.setRedisPoolConfig(redisPoolConfig);
					redisServerSlaveBeans.add(newRedisSlaveBean);
				}else{
					redisServerSlaveBeans.add(redisServerBean);
				}
			}
			ffanRedisServerClusterBean.setRedisServerSlaveBeans(redisServerSlaveBeans);
		}
		
		//--------------------------------------------------------------------------
		for(String key1:super.getFfanRedisServerMasterCluster().getRedisServerClusterBeanMap().keySet()){
			LBRedisServerClusterBean ffanRedisServerClusterBean=super.getFfanRedisServerMasterCluster().getRedisServerClusterBeanMap().get(key1);
			if(ffanRedisServerClusterBean!=null){
				List<LBRedisServerBean> redisServerSlaveBeans=new ArrayList<LBRedisServerBean>();
				for(LBRedisServerBean redisServerBean:ffanRedisServerClusterBean.getRedisServerSlaveBeans()){
					if(redisServerBean.getKey().equals(key)){
						RedisPoolConfig redisPoolConfig=redisServerBean.getRedisPoolConfig();
						newRedisSlaveBean.setHost(jsonObject.getString("host"));
						newRedisSlaveBean.setPort(jsonObject.getIntValue("port"));
						newRedisSlaveBean.setWeight(jsonObject.getIntValue("weight"));
						newRedisSlaveBean.setRedisPoolConfig(redisPoolConfig);
						redisServerSlaveBeans.add(newRedisSlaveBean);
					}else{
						redisServerSlaveBeans.add(redisServerBean);
					}
				}
				ffanRedisServerClusterBean.setRedisServerSlaveBeans(redisServerSlaveBeans);
				super.getFfanRedisServerMasterCluster().getRedisServerClusterBeanMap().put(key1, ffanRedisServerClusterBean);
			}
			
		}
		
		//--------------------------------------------------------------------------
		if(!super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().containsKey(newRedisSlaveBean.getKey())){
			//初始化新的客户端
			  LBRedisProxyPoolConfig lbRedisProxyPoolConfig=convertLBRedisProxyPoolConfig(newRedisSlaveBean);
			  LBRedisClient ffanRedisClient=new LBRedisClient(lbRedisProxyPoolConfig);
			  super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().put(newRedisSlaveBean.getKey(), ffanRedisClient);
		}
		if(super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().containsKey(key)){
			super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().get(key).close();
			super.getFfanRedisServerMasterCluster().getRedisClientBeanMap().remove(key);
		}
		//-----------------------------------------------------------------------------
		for(String key2:super.getFfanRedisServerMasterCluster().getMasterClusters().keySet()){
			List<LBRedisServerBean> slaveServerBeans=super.getFfanRedisServerMasterCluster().getMasterClusters().get(key2);
			if(slaveServerBeans!=null){
				List<LBRedisServerBean> redisServerSlaveBeans=new ArrayList<LBRedisServerBean>();
				for(LBRedisServerBean redisServerBean:slaveServerBeans){
					if(redisServerBean.getKey().equals(key)){
						RedisPoolConfig redisPoolConfig=redisServerBean.getRedisPoolConfig();
						newRedisSlaveBean.setHost(jsonObject.getString("host"));
						newRedisSlaveBean.setPort(jsonObject.getIntValue("port"));
						newRedisSlaveBean.setWeight(jsonObject.getIntValue("weight"));
						newRedisSlaveBean.setRedisPoolConfig(redisPoolConfig);
						redisServerSlaveBeans.add(newRedisSlaveBean);
					}else{
						redisServerSlaveBeans.add(redisServerBean);
					}
				}
				super.getFfanRedisServerMasterCluster().getMasterClusters().put(key2, redisServerSlaveBeans);
			}
			
			
		}
		
	}
}
